/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.alibaba.nacossync.extension.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.eureka.EurekaNamingService;
import com.alibaba.nacossync.extension.event.SpecialSyncEventBus;
import com.alibaba.nacossync.extension.holder.EurekaServerHolder;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.NacosUtil;
import com.google.common.collect.Lists;
import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.LeaseInfo;
import com.netflix.appinfo.MyDataCenterInfo;

import lombok.extern.slf4j.Slf4j;

/**
 * @author zhanglong
 * @author fenglibin
 */
@Slf4j
@NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.EUREKA)
public class NacosSyncToEurekaServiceImpl implements SyncService {
    private final Map<String, EventListener> nacosListenerMap = new ConcurrentHashMap<>();

    private final MetricsManager metricsManager;
    private final SkyWalkerCacheServices skyWalkerCacheServices;
    private final NacosServerHolder nacosServerHolder;
    private final EurekaServerHolder eurekaServerHolder;
    private final SpecialSyncEventBus specialSyncEventBus;
    

    @Autowired
    public NacosSyncToEurekaServiceImpl(MetricsManager metricsManager, SkyWalkerCacheServices skyWalkerCacheServices,
                                        NacosServerHolder nacosServerHolder, EurekaServerHolder eurekaServerHolder,
                                        SpecialSyncEventBus specialSyncEventBus) {
        this.metricsManager = metricsManager;
        this.skyWalkerCacheServices = skyWalkerCacheServices;
        this.nacosServerHolder = nacosServerHolder;
        this.eurekaServerHolder = eurekaServerHolder;
        this.specialSyncEventBus = specialSyncEventBus;
    }
    
    @Override
	public boolean stop(TaskDO taskDO) {
    	try {
    		specialSyncEventBus.unsubscribe(taskDO);
    		
    		NamingService sourceNamingService =
                    nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
    		
    		//从Nacos获取所有的服务名称
        	Set<String> currentServicesNameList = new HashSet<String>();
        	if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {
        		currentServicesNameList.addAll(NacosUtil.getAllNacosServiceNameList(taskDO.getGroupName(), sourceNamingService));
        	}else {
        		currentServicesNameList.addAll(Lists.newArrayList(taskDO.getServiceName().split(SkyWalkerConstants.COMMA)));
        	}
        	
        	currentServicesNameList.forEach(serviceName->{
        		// 根据名称组装TaskDO
            	TaskDO currentTaskDO = new TaskDO();
                BeanUtils.copyProperties(taskDO, currentTaskDO);
                currentTaskDO.setServiceName(serviceName);
                
                try {
                	String listenerKey = new StringBuilder(taskDO.getTaskId()).append("_").append(currentTaskDO.getServiceName()).toString();
					if(nacosListenerMap.get(listenerKey) != null) {
						sourceNamingService.unsubscribe(currentTaskDO.getServiceName(), currentTaskDO.getGroupName(), nacosListenerMap.get(listenerKey));
					}
				} catch (NacosException e) {
					log.error("Unsubscribe service [{}] from Nacos exception happened:{}", currentTaskDO.getServiceName(), e.toString());
				}
        	});
        	//lastServicesNameList.clear();
        	
			log.info("Stop sync taskId: {} successfully, service name is:{} succcessfully.", taskDO.getTaskId(), taskDO.getServiceName());
			return true;
    	} catch (Exception e) {
            log.error("stop a task from Nacos to Eureka was failed, taskId:"+taskDO.getTaskId()+", taskName:"+taskDO.getServiceName()+", error:"+e.getMessage(), e);
			return false;
		}
	}

    @Override
    public boolean delete(TaskDO taskDO) {
        try {
        	specialSyncEventBus.unsubscribe(taskDO);
            NamingService sourceNamingService =
                    nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
            EurekaNamingService destNamingService =
                    eurekaServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());

            //从Nacos获取所有的服务名称
        	Set<String> currentServicesNameList = new HashSet<String>();
        	if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {
        		currentServicesNameList.addAll(NacosUtil.getAllNacosServiceNameList(taskDO.getGroupName(), sourceNamingService));
        	}else {
        		currentServicesNameList.addAll(Lists.newArrayList(taskDO.getServiceName().split(SkyWalkerConstants.COMMA)));
        	}
        	
        	currentServicesNameList.forEach(serviceName->{
        		// 根据名称组装TaskDO
            	TaskDO currentTaskDO = new TaskDO();
                BeanUtils.copyProperties(taskDO, currentTaskDO);
                currentTaskDO.setServiceName(serviceName);
                
                try {
                	// 组装对应服务的监听Key
                	String listenerKey = new StringBuilder(taskDO.getTaskId()).append("_").append(currentTaskDO.getServiceName()).toString();
					if(nacosListenerMap.get(listenerKey) != null) {
						sourceNamingService.unsubscribe(currentTaskDO.getServiceName(), currentTaskDO.getGroupName(), nacosListenerMap.get(listenerKey));
					}
				} catch (NacosException e) {
					log.error("Unsubscribe service [{}] from Nacos exception happened:{}", currentTaskDO.getServiceName(), e.toString());
				}
                
                // 删除目标集群中同步的实例列表
                List<InstanceInfo> allInstances = destNamingService.getApplication(currentTaskDO.getServiceName());
                if (allInstances != null) {
                    for (InstanceInfo instance : allInstances) {
                        if (needDelete(instance.getMetadata(), currentTaskDO)) {
                            destNamingService.deregisterInstance(instance);
                        }
                    }
                }
        	});
        	log.info("Delete sync taskId: {} successfully, service name is:{}.", taskDO.getTaskId(), taskDO.getServiceName());
        } catch (Exception e) {
            log.error("delete task from Nacos to Eureka was failed, taskId:{}", taskDO.getTaskId(), e);
            metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
            return false;
        }
        return true;
    }

    @Override
    public boolean sync(TaskDO taskDO) {
        try {
            NamingService sourceNamingService =
                    nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
            EurekaNamingService destNamingService =
                    eurekaServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
            
            //从Nacos获取所有的服务名称
        	Set<String> currentServicesNameList = new HashSet<String>();
        	if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {
        		currentServicesNameList.addAll(NacosUtil.getAllNacosServiceNameList(taskDO.getGroupName(), sourceNamingService));
        	}else {
        		currentServicesNameList.addAll(Lists.newArrayList(taskDO.getServiceName().split(SkyWalkerConstants.COMMA)));
        	}
        	//针对每个服务发起变更订阅
            currentServicesNameList.forEach(serviceName->{
            	// 根据名称组装TaskDO
            	TaskDO currentTaskDO = new TaskDO();
                BeanUtils.copyProperties(taskDO, currentTaskDO);
                currentTaskDO.setServiceName(serviceName);
                
                // 组装对应服务的监听Key
            	String listenerKey = new StringBuilder(taskDO.getTaskId()).append("_").append(currentTaskDO.getServiceName()).toString();
                
            	if(nacosListenerMap.get(listenerKey) == null) {//启动时第一次注册
            		processNamingEvent(currentTaskDO, sourceNamingService, destNamingService, null, true);
            	}
            	
            	nacosListenerMap.putIfAbsent(listenerKey, event -> {
                    processNamingEvent(currentTaskDO, sourceNamingService, destNamingService, event, false);
                });
            	
            	try {
					sourceNamingService.subscribe(currentTaskDO.getServiceName(), currentTaskDO.getGroupName(), nacosListenerMap.get(listenerKey));
				} catch (NacosException e) {
					log.error("Subscribe from Nacos for service ["+serviceName+"] exception happened:"+e.getMessage(),e);
				}
            	
            });
            
            if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {
            	specialSyncEventBus.subscribe(taskDO, this::sync);
            }
            
        } catch (Exception e) {
            log.error("sync task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
            metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
            return false;
        }
        return true;
    }

    private void processNamingEvent(TaskDO taskDO, NamingService sourceNamingService,
        EurekaNamingService destNamingService, Event event, boolean force) {
        if ((event instanceof NamingEvent) || force) {
            try {
        		Set<String> instanceKeySet = new HashSet<>();
                List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), taskDO.getGroupName());
                // 先将新的注册一遍
                addAllNewInstance(taskDO, destNamingService, instanceKeySet, sourceInstances);
                // 再将不存在的删掉
                ifNecessaryDelete(taskDO, sourceNamingService, destNamingService, instanceKeySet);
            } catch (Exception e) {
                log.error("event process fail, taskId:{}, error:{}", taskDO.getTaskId(), e.toString());
                metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
            }
        }
    }
    
    /**
     * 将Nacos中的节点，往Eureka中注册
     * @param taskDO
     * @param destNamingService
     * @param instanceKeySet
     * @param sourceInstances
     */
    private void addAllNewInstance(TaskDO taskDO, EurekaNamingService destNamingService, Set<String> instanceKeySet,
        List<Instance> sourceInstances) {
        for (Instance instance : sourceInstances) {
            if (needSync(instance.getMetadata(),taskDO)) {
            	log.info("Begin to register new node to Eureka, serviceName:{}, ip:{}, port:{}", instance.getServiceName(), instance.getIp(), instance.getPort());
                destNamingService.registerInstance(buildSyncInstance(instance, taskDO));
                instanceKeySet.add(composeInstanceKey(instance.getIp(), instance.getPort()));
            }
        }
    }

    /**
     * 从目标注册服务中心Eureka中获取当前服务所有的Provider节点，然后判断这些Provider节点是否还存在于源注册中心Nacos中，
     * 如果存在的则保留，不存在的则从目标注册中心Eureka中删除掉。
     * @param taskDO
     * @param destNamingService
     * @param instanceKeySet
     * @throws NacosException 
     */
    private void ifNecessaryDelete(TaskDO taskDO, NamingService sourceNamingService, EurekaNamingService destNamingService, Set<String> instanceKeySet) throws NacosException {
        List<InstanceInfo> allInstances = destNamingService.getApplication(taskDO.getServiceName());
        if (allInstances != null){
            for (InstanceInfo instance : allInstances) {
                if (needDelete(instance.getMetadata(), taskDO) && !instanceKeySet.contains(composeInstanceKey(instance.getIPAddr(),
                        instance.getPort()))) {
                	// 再次判断Nacos中是否真的不存在对应的Provider节点
                	List<Instance> nacosInstanceList = sourceNamingService.getAllInstances(taskDO.getServiceName(), taskDO.getGroupName());
                	Optional<Instance> instanceOptional = nacosInstanceList.stream().filter(t->{return t.getIp().contentEquals(instance.getIPAddr()) && t.getPort()==instance.getPort();}).findAny();
                	if(!instanceOptional.isPresent()) {
	                	log.info("Delete provider instance from Eureka which not exists in Nacos, service name:{},ip:{},port:{}",taskDO.getServiceName(),instance.getIPAddr(),instance.getPort());
	                    destNamingService.deregisterInstance(instance);
                	}
                }

            }
        }
    }

    private String composeInstanceKey(String ip, int port) {
        return ip + ":" + port;
    }

    private InstanceInfo buildSyncInstance(Instance instance, TaskDO taskDO) {
        DataCenterInfo dataCenterInfo = new MyDataCenterInfo(DataCenterInfo.Name.MyOwn);
        final Map<String, String> instanceMetadata = instance.getMetadata();
        HashMap<String, String> metadata = new HashMap<>(16);
        metadata.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
        metadata.put(SkyWalkerConstants.SYNC_SOURCE_KEY, skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
        metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
        metadata.putAll(instanceMetadata);
        String homePageUrl = obtainHomePageUrl(instance, instanceMetadata);
        String serviceName = taskDO.getServiceName();

        return new InstanceInfo(
                instance.getIp() + ":" + serviceName + ":" + instance.getPort(),
                serviceName,
                null,
                instance.getIp(),
                null,
                new InstanceInfo.PortWrapper(true, instance.getPort()),
                null,
                homePageUrl+"/actuator/env",
                homePageUrl + "/actuator/info",
                homePageUrl + "/actuator/health",
                null,
                serviceName,
                serviceName,
                1,
                dataCenterInfo,
                instance.getIp(),
                InstanceInfo.InstanceStatus.UP,
                InstanceInfo.InstanceStatus.UNKNOWN,
                null,
                new LeaseInfo(30, 90,
                        0L, 0L, 0L, 0L, 0L),
                false,
                metadata,
                System.currentTimeMillis(),
                System.currentTimeMillis(),
                null,
                null
        );
    }

    private String obtainHomePageUrl(Instance instance, Map<String, String> instanceMetadata) {
        final String managementContextPath =
            obtainManagementContextPath(instanceMetadata);
        final String managementPort = instanceMetadata.getOrDefault(SkyWalkerConstants.MANAGEMENT_PORT_KEY,
            String.valueOf(instance.getPort()));
        return String.format("http://%s:%s%s",instance.getIp(),managementPort,managementContextPath);
    }

    private String obtainManagementContextPath(Map<String, String> instanceMetadata) {
        final String path = instanceMetadata.getOrDefault(SkyWalkerConstants.MANAGEMENT_CONTEXT_PATH_KEY, "");
        if (path.endsWith("/")) {
            return path.substring(0, path.length() - 1);
        }
        return path;
    }
}
